{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create Hive schema for Parquet files"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook demonstrates how to create a hive table for existing Parquet files. <br>\n",
    "This can be done for a single file as well as for multiple files residing under the same folder."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Creating of spark context with hive support."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder.appName(\"Import parquet schema to hive\").config(\"hive.metastore.uris\", \"thrift://hive:9083\").enableHiveSupport().getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define function below for getting sql script needed for creating table in hive using dataframe types as columns to table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def getCreateTableScript(databaseName, tableName, path, df):\n",
    "    cols = df.dtypes\n",
    "    createScript = \"CREATE EXTERNAL TABLE IF NOT EXISTS \" + databaseName + \".\" + tableName + \"(\"\n",
    "    colArray = []\n",
    "    for colName, colType in cols:\n",
    "        colArray.append(colName.replace(\" \", \"_\") + \" \" + colType)\n",
    "    createColsScript =   \", \".join(colArray )\n",
    "    \n",
    "    script = createScript + createColsScript + \") STORED AS PARQUET LOCATION '\" + path + \"'\"\n",
    "    print(script)\n",
    "    return script\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "#define main function for creating table where arqument 'path' is path to parquet files \n",
    "def createTable(databaseName, tableName, path): \n",
    "    df = spark.read.parquet(path)\n",
    "    sqlScript = getCreateTableScript(databaseName, tableName, path, df)\n",
    "    spark.sql(sqlScript)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## One file example"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.tab1_single_file(registration_dttm timestamp, id int, first_name string, last_name string, email string, gender string, ip_address string, cc string, country string, birthdate string, salary double, title string, comments string) STORED AS PARQUET LOCATION 'v3io://users/adi/examples/userdata1.parquet'\n"
     ]
    }
   ],
   "source": [
    "# Set the path where the parquet file is located.\n",
    "my_parqute_file_path = os.path.join('v3io://users/'+os.getenv('V3IO_USERNAME')+'/examples/userdata1.parquet')\n",
    "\n",
    "createTable(\"default\",\"tab1_single_file\",my_parqute_file_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## One folder example for spark output job"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CREATE EXTERNAL TABLE IF NOT EXISTS default.table_from_dir(registration_dttm timestamp, id int, first_name string, last_name string, email string, gender string, ip_address string, cc string, country string, birthdate string, salary double, title string, comments string) STORED AS PARQUET LOCATION 'v3io://users/adi/examples/spark-output/*'\n"
     ]
    }
   ],
   "source": [
    "# Set the path where the parquet folder is located.\n",
    "folder_path = os.path.join('v3io://users/'+os.getenv('V3IO_USERNAME')+'/examples/spark-output/*')\n",
    "\n",
    "createTable(\"default\",\"table_from_dir\",folder_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Multiple files and folders example\n",
    "\n",
    "In this example change the name of the database and path to the folder where all parquet files (or folders with them) are located. <br>\n",
    "This code goes over all files and dirs in the provided path and uses them for creating tables.\n",
    "File should be ended with .parquet format\n",
    "Directory (in which stored parquet files) should be started with \".\"\n",
    "Name of directory or file will be name of table."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {
    "collapsed": true,
    "jupyter": {
     "outputs_hidden": true
    }
   },
   "outputs": [
    {
     "ename": "FileNotFoundError",
     "evalue": "[Errno 2] No such file or directory: 'v3io://users/adi/examples/multiple-parquet-files'",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mFileNotFoundError\u001b[0m                         Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-14-160586b6d05a>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m\u001b[0m\n\u001b[1;32m      3\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      4\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 5\u001b[0;31m \u001b[0;32mfor\u001b[0m \u001b[0mfileOrDir\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mos\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mlistdir\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mfilepath\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m      6\u001b[0m     \u001b[0;32mif\u001b[0m \u001b[0mfileOrDir\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mendswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\".parquet\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      7\u001b[0m         \u001b[0mcreateTable\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdatabaseName\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfileOrDir\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\".parquet\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mfilepath\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mreplace\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"/v3io/\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"v3io://\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0;34m\"/\"\u001b[0m \u001b[0;34m+\u001b[0m \u001b[0mfileOrDir\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;31mFileNotFoundError\u001b[0m: [Errno 2] No such file or directory: 'v3io://users/adi/examples/multiple-parquet-files'"
     ]
    }
   ],
   "source": [
    "databaseName = \"default\"\n",
    "filepath = os.path.join('v3io://users/'+os.getenv('V3IO_USERNAME')+'/examples/multiple-parquet-files')\n",
    "\n",
    "\n",
    "for fileOrDir in os.listdir(filepath):\n",
    "    if fileOrDir.endswith(\".parquet\") :\n",
    "        createTable(databaseName, fileOrDir.split(\".parquet\")[0], filepath.replace(\"/v3io/\", \"v3io://\", 1) + \"/\" + fileOrDir)\n",
    "    elif not fileOrDir.startswith(\".\") :\n",
    "        createTable(databaseName, fileOrDir, filepath.replace(\"/v3io/\", \"v3io://\", 1) + \"/\" + fileOrDir + \"/*\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Test how it works"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+\n",
      "|databaseName|\n",
      "+------------+\n",
      "|     default|\n",
      "+------------+\n",
      "\n",
      "+--------+----------------+-----------+\n",
      "|database|       tableName|isTemporary|\n",
      "+--------+----------------+-----------+\n",
      "| default|tab1_single_file|      false|\n",
      "| default|  table_from_dir|      false|\n",
      "+--------+----------------+-----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# test how the tables were saved\n",
    "#spark.sql(\"drop database test CASCADE\")\n",
    "databaseName = \"default\"\n",
    "\n",
    "spark.sql(\"show databases\").show()\n",
    "spark.sql(\"show tables in \" + databaseName).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[registration_dttm: timestamp, id: int, first_name: string, last_name: string, email: string, gender: string, ip_address: string, cc: string, country: string, birthdate: string, salary: double, title: string, comments: string]"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# test how saving to table works\n",
    "tableName = \"table_from_dir\"\n",
    "spark.sql(\"select * from \" + databaseName + \".\" + tableName)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Run select via Hive"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Done.\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "    <tr>\n",
       "        <th>registration_dttm</th>\n",
       "        <th>id</th>\n",
       "        <th>first_name</th>\n",
       "        <th>last_name</th>\n",
       "        <th>email</th>\n",
       "        <th>gender</th>\n",
       "        <th>ip_address</th>\n",
       "        <th>cc</th>\n",
       "        <th>country</th>\n",
       "        <th>birthdate</th>\n",
       "        <th>salary</th>\n",
       "        <th>title</th>\n",
       "        <th>comments</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 07:55:29.000</td>\n",
       "        <td>1</td>\n",
       "        <td>Amanda</td>\n",
       "        <td>Jordan</td>\n",
       "        <td>ajordan0@com.com</td>\n",
       "        <td>Female</td>\n",
       "        <td>1.197.201.2</td>\n",
       "        <td>6759521864920116</td>\n",
       "        <td>Indonesia</td>\n",
       "        <td>3/8/1971</td>\n",
       "        <td>49756.53</td>\n",
       "        <td>Internal Auditor</td>\n",
       "        <td>1E+02</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 06:47:06.000</td>\n",
       "        <td>8</td>\n",
       "        <td>Harry</td>\n",
       "        <td>Howell</td>\n",
       "        <td>hhowell7@eepurl.com</td>\n",
       "        <td>Male</td>\n",
       "        <td>91.235.51.73</td>\n",
       "        <td></td>\n",
       "        <td>Bosnia and Herzegovina</td>\n",
       "        <td>3/1/1962</td>\n",
       "        <td>186469.43</td>\n",
       "        <td>Web Developer IV</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 03:52:53.000</td>\n",
       "        <td>9</td>\n",
       "        <td>Jose</td>\n",
       "        <td>Foster</td>\n",
       "        <td>jfoster8@yelp.com</td>\n",
       "        <td>Male</td>\n",
       "        <td>132.31.53.61</td>\n",
       "        <td></td>\n",
       "        <td>South Korea</td>\n",
       "        <td>3/27/1992</td>\n",
       "        <td>231067.84</td>\n",
       "        <td>Software Test Engineer I</td>\n",
       "        <td>1E+02</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 18:29:47.000</td>\n",
       "        <td>10</td>\n",
       "        <td>Emily</td>\n",
       "        <td>Stewart</td>\n",
       "        <td>estewart9@opensource.org</td>\n",
       "        <td>Female</td>\n",
       "        <td>143.28.251.245</td>\n",
       "        <td>3574254110301671</td>\n",
       "        <td>Nigeria</td>\n",
       "        <td>1/28/1997</td>\n",
       "        <td>27234.28</td>\n",
       "        <td>Health Coach IV</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 00:36:21.000</td>\n",
       "        <td>4</td>\n",
       "        <td>Denise</td>\n",
       "        <td>Riley</td>\n",
       "        <td>driley3@gmpg.org</td>\n",
       "        <td>Female</td>\n",
       "        <td>140.35.109.83</td>\n",
       "        <td>3576031598965625</td>\n",
       "        <td>China</td>\n",
       "        <td>4/8/1997</td>\n",
       "        <td>90263.05</td>\n",
       "        <td>Senior Cost Accountant</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 05:05:31.000</td>\n",
       "        <td>5</td>\n",
       "        <td>Carlos</td>\n",
       "        <td>Burns</td>\n",
       "        <td>cburns4@miitbeian.gov.cn</td>\n",
       "        <td></td>\n",
       "        <td>169.113.235.40</td>\n",
       "        <td>5602256255204850</td>\n",
       "        <td>South Africa</td>\n",
       "        <td></td>\n",
       "        <td>None</td>\n",
       "        <td></td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 07:22:34.000</td>\n",
       "        <td>6</td>\n",
       "        <td>Kathryn</td>\n",
       "        <td>White</td>\n",
       "        <td>kwhite5@google.com</td>\n",
       "        <td>Female</td>\n",
       "        <td>195.131.81.179</td>\n",
       "        <td>3583136326049310</td>\n",
       "        <td>Indonesia</td>\n",
       "        <td>2/25/1983</td>\n",
       "        <td>69227.11</td>\n",
       "        <td>Account Executive</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 08:33:08.000</td>\n",
       "        <td>7</td>\n",
       "        <td>Samuel</td>\n",
       "        <td>Holmes</td>\n",
       "        <td>sholmes6@foxnews.com</td>\n",
       "        <td>Male</td>\n",
       "        <td>232.234.81.197</td>\n",
       "        <td>3582641366974690</td>\n",
       "        <td>Portugal</td>\n",
       "        <td>12/18/1987</td>\n",
       "        <td>14247.62</td>\n",
       "        <td>Senior Financial Analyst</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 17:04:03.000</td>\n",
       "        <td>2</td>\n",
       "        <td>Albert</td>\n",
       "        <td>Freeman</td>\n",
       "        <td>afreeman1@is.gd</td>\n",
       "        <td>Male</td>\n",
       "        <td>218.111.175.34</td>\n",
       "        <td></td>\n",
       "        <td>Canada</td>\n",
       "        <td>1/16/1968</td>\n",
       "        <td>150280.17</td>\n",
       "        <td>Accountant IV</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "        <td>2016-02-03 01:09:31.000</td>\n",
       "        <td>3</td>\n",
       "        <td>Evelyn</td>\n",
       "        <td>Morgan</td>\n",
       "        <td>emorgan2@altervista.org</td>\n",
       "        <td>Female</td>\n",
       "        <td>7.161.136.94</td>\n",
       "        <td>6767119071901597</td>\n",
       "        <td>Russia</td>\n",
       "        <td>2/1/1960</td>\n",
       "        <td>144972.51</td>\n",
       "        <td>Structural Engineer</td>\n",
       "        <td></td>\n",
       "    </tr>\n",
       "</table>"
      ],
      "text/plain": [
       "[('2016-02-03 07:55:29.000', 1, 'Amanda', 'Jordan', 'ajordan0@com.com', 'Female', '1.197.201.2', '6759521864920116', 'Indonesia', '3/8/1971', 49756.53, 'Internal Auditor', '1E+02'),\n",
       " ('2016-02-03 06:47:06.000', 8, 'Harry', 'Howell', 'hhowell7@eepurl.com', 'Male', '91.235.51.73', '', 'Bosnia and Herzegovina', '3/1/1962', 186469.43, 'Web Developer IV', ''),\n",
       " ('2016-02-03 03:52:53.000', 9, 'Jose', 'Foster', 'jfoster8@yelp.com', 'Male', '132.31.53.61', '', 'South Korea', '3/27/1992', 231067.84, 'Software Test Engineer I', '1E+02'),\n",
       " ('2016-02-03 18:29:47.000', 10, 'Emily', 'Stewart', 'estewart9@opensource.org', 'Female', '143.28.251.245', '3574254110301671', 'Nigeria', '1/28/1997', 27234.28, 'Health Coach IV', ''),\n",
       " ('2016-02-03 00:36:21.000', 4, 'Denise', 'Riley', 'driley3@gmpg.org', 'Female', '140.35.109.83', '3576031598965625', 'China', '4/8/1997', 90263.05, 'Senior Cost Accountant', ''),\n",
       " ('2016-02-03 05:05:31.000', 5, 'Carlos', 'Burns', 'cburns4@miitbeian.gov.cn', '', '169.113.235.40', '5602256255204850', 'South Africa', '', None, '', ''),\n",
       " ('2016-02-03 07:22:34.000', 6, 'Kathryn', 'White', 'kwhite5@google.com', 'Female', '195.131.81.179', '3583136326049310', 'Indonesia', '2/25/1983', 69227.11, 'Account Executive', ''),\n",
       " ('2016-02-03 08:33:08.000', 7, 'Samuel', 'Holmes', 'sholmes6@foxnews.com', 'Male', '232.234.81.197', '3582641366974690', 'Portugal', '12/18/1987', 14247.62, 'Senior Financial Analyst', ''),\n",
       " ('2016-02-03 17:04:03.000', 2, 'Albert', 'Freeman', 'afreeman1@is.gd', 'Male', '218.111.175.34', '', 'Canada', '1/16/1968', 150280.17, 'Accountant IV', ''),\n",
       " ('2016-02-03 01:09:31.000', 3, 'Evelyn', 'Morgan', 'emorgan2@altervista.org', 'Female', '7.161.136.94', '6767119071901597', 'Russia', '2/1/1960', 144972.51, 'Structural Engineer', '')]"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "%sql select * from hive.default.tab1_single_file limit 10"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Access Hive from command line"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In order to run Hive from command line,open up a jupyter terminal and run \"hive\" <br>\n",
    "To view all existing hive tables run: show tables; <br>\n",
    "Here you can run queries without specifying Hive. <br>\n",
    "e.g. select * from table_from_single_file2;"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[]"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql(\"drop table \" + databaseName + \".tab1_single_file\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "DataFrame[]"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark.sql(\"drop table \" + databaseName + \".table_from_dir\")"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
